package com.alisonyu.spider;
import com.alisonyu.spider.Downloader.Download;
import com.alisonyu.spider.Downloader.Request;
import com.alisonyu.spider.Proccessor.Context;
import com.alisonyu.spider.Proccessor.Process;
import com.alisonyu.spider.Router.Router;
import com.alisonyu.spider.Scheduler.Schedule;
import io.vertx.core.Vertx;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Consumer;

/**
 *爬虫框架核心类
 * Created by yu on 2017/9/23.
 */
public class Spider
{
    private static Vertx vertx;
    private Router router;
    private Schedule scheduler;
    private Download downloader;
    private Process processor;
    //是否启动
    private volatile boolean on=true;
    //是否停止
    private volatile boolean pause=false;
    //访问间隔,单位毫秒
    private int interval =0;
    private int waitingNextRequestInterval = 2;
    //定时任务
    private Consumer<Spider> timingTask;
    //定时任务间隔
    private long timingTaskInterval=0;
    //工作线程数
    private int workerThreadsAmount=5;
    private int instanceAmount = 1;
    //worker线程池，用于作耗时操作，例如IO操作等
    public static ExecutorService workerPool;


    public Spider(SpiderConfig config){
        SpiderFactory factory = new SpiderFactory(config);
        vertx = factory.getVertx();
        this.downloader = factory.getDownloader();
        this.scheduler = factory.getScheduler();
        this.processor = factory.getProcessor();
        this.router=factory.getRouter();
        this.interval = config.getInterval();
        this.instanceAmount = config.getInstanceAmount();
        initRequest(config.getInitRequests());
        initUrl(config.getInitUrls());
        autoClose();
        config.clear();
    }

    //开始爬虫，会重启线程
    public void start(){
        //爬虫线程
        Thread spiderThread=new Thread(this::spider);
        //开启线程
        workerPool=Executors.newFixedThreadPool(this.workerThreadsAmount);
        spiderThread.start();
        //设置定时任务
        if(timingTask!=null){
            vertx.setPeriodic(timingTaskInterval,(time)->{
                this.timingTask.accept(this);
            });
        }
    }

    public void autoClose(){
        vertx.setPeriodic(1000*5*60,id->{
           if (scheduler.isEmpty()){
               this.stop();
           }
        });
    }

    //停止爬虫，会终结线程
    public void stop(){
        System.out.println("stop");
        this.on=false;
        this.pause = true;
        if(workerPool!=null){
            workerPool.shutdown();
        }
        vertx.close();
        System.out.println("close2");
    }

    //暂停爬虫，不需要终结线程
    public void pause(){
        this.pause=true;
    }

    //重新开始爬虫,不需要重新初始化线程
    public void resume(){
        this.pause=false;
    }


    public void initUrl(List<String> urlList){
        urlList.stream().map(Request::new).forEach(rq->scheduler.putRequest(rq));
    }

    public void initUrl(String... urls){
        for(String url:urls){
            scheduler.putRequest(new Request(url));
        }
    }

    public void initRequest(List<Request> requestsList){
        requestsList.forEach(rq->scheduler.putRequest(rq));
    }

    //设置访问1次URL的间隔，推荐不设置间隔
    public Spider setInterval(int mills){
        this.interval =mills;
        return this;
    }

    //默认为1，用于初始化workers
    public Spider setWorkerAmount(int num){
        this.workerThreadsAmount=num;
        return this;
    }

    //设置定时爬虫任务
    public Spider setTimingTask(long interval,Consumer<Spider> task){
        this.timingTaskInterval=interval;
        this.timingTask=task;
        return this;
    }


    public static void handleTask(Runnable runnable){
        workerPool.submit(runnable);
    }

    public static Future handleTask(Callable callable){
        return workerPool.submit(callable);
    }

    private void spider(){
        int cnt = 0;
        while(on){
            while(!pause){
                try {
                    Request request=scheduler.getRequest();
                    if (request==null){
                        TimeUnit.SECONDS.sleep(waitingNextRequestInterval);
                        continue;
                    }
                    if(this.interval >0 && cnt % instanceAmount == 0){
                        TimeUnit.MILLISECONDS.sleep(this.interval);
                    }
                    downloader.asyncDownload(request).thenAccept(resp->{
                            processor.process(new Context(request,resp,this));
                    });
                } catch (Exception e) {
                    e.printStackTrace();
                }
                cnt++;
            }
        }
        System.out.println("existed");
    }

    public void addToTaskQueue(String uri){
        scheduler.putUrl(uri);
    }


    public static Vertx getVertx(){
        return vertx;
    }


}
